package com.xl.competition.modul_b.task2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object ODSToDWDTest1 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val warehouse = "hdfs://hadoop102:9000/hive/warehouse/home"
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("read mysql write hive")
    //.set("spark.testing.memory", "471859200")
    val sparkSession: SparkSession = SparkSession
      .builder()
      .enableHiveSupport()
      .config(conf)
      .config("spark.sql.warehouse.dir", warehouse)
      .getOrCreate()
    // 获取最新的数据
    val ods: DataFrame = sparkSession.sql("select * from ods.user_info where etldate = '20220719'")
      .withColumn("dwd_insert_user", lit("user1"))
      .withColumn("create_time", date_format(col("create_time"), "yyyyMMdd"))
      .withColumn("dwd_insert_time", lit(lit(date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss" /*格式化当前时间，转换成String类型的时间戳格式*/)).cast("timestamp"))) /*转换成时间戳类型*/
      .withColumn("dwd_modify_user", lit("user1"))
      .withColumn("dwd_modify_time", lit(lit(date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss")).cast("timestamp")))
      .drop("etldate")
    //获取最新的分区数据,具体hive cli查询
    val dwd = sparkSession.sql("select * from dwd.dim_user_info where etl_date = '20220719'").drop("etl_date")
    // 将两个表数据合并,对字段operate_time为null的数据条目填充create_time的值
    ods.unionByName(dwd)
      .withColumn("operate_time", when(col("operate_time").isNull, col("create_time"))
        .otherwise(col("operate_time")))
      .createTempView("v")
    //    两张表的总数据   ods 12  1  1号  1号   dwd  12  2  3号  1号 ==》12都会保留
    sparkSession.sql("select * from v").show(false)
    // 将合并的数据依据operate_time排序,max获取最原始插入时间

    //desc 降序
    // asc 升序
    sparkSession.sql(
      """
        |select *,
        | row_number() over (partition by v.id order by operate_time desc) as operate_time_num,
        | min(dwd_insert_time) over (partition by v.id) as min_dwd_insert_time
        | from v
        |""".stripMargin)
      .withColumn("dwd_insert_time", when(col("operate_time_num") === 1, col("min_dwd_insert_time"))
        .otherwise(col("dwd_insert_time")))
      .filter(col("operate_time_num") === 1) //过滤掉第一条数据，是通过降序排序的
      .drop("operate_time_num", "min_dwd_insert_time") // 删除多余字段
      .withColumn("birthday", col("birthday").cast("timestamp")) //数据类型 转换
      .createTempView("v_result")


    sparkSession.sql(
      """
        |insert overwrite table dwd.dim_user_info partition (etl_date="20220719")
        |from (select * from v_result)
        |""".stripMargin)
    sparkSession.stop()

  }
}
